package com.bootdo.rabbitMQ.consumer;

import com.alibaba.fastjson.JSONObject;
import com.bootdo.rabbitMQ.entity.MessageVo;
import com.bootdo.web.service.SendSMSService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;


/**
 * @Author wukq
 * @Date: 2020/4/13
 * @Description: 用户消息推送, 一个消息只能被一个用户接受
 */
@Configuration
@EnableRabbit
public class RabbitSMSPushConsumer implements RabbitListenerConfigurer {

    private static final Logger logger = LoggerFactory.getLogger(RabbitSMSPushConsumer.class);

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private SendSMSService sendSMSService;

    @Bean
    public DefaultMessageHandlerMethodFactory queueSmsPushFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    /**
     * 匹配queueSmsPush 这个路由，消费交换机传进这个queueSmsPush路由传入队列的消息
     *
     * @Qualifier 的意思是合格者，通过这个标示，表明了哪个实现类才是我们所需要的
     * @Qualifier注解的用处： 当一个接口有多个实现的时候，为了指名具体调用哪个类的实现。
     */
    @Bean
    public SimpleMessageListenerContainer queueSmsPushContainer(@Qualifier("queueSmsPush") Queue queue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMaxConcurrentConsumers(1);
        container.setDefaultRequeueRejected(true);//是否重回队列
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动确认模式
        container.setMessageListener(new ChannelAwareMessageListener() {
            int i = 0;

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                String bodyString = new String(body);
                logger.info("rabbitmq 接收到短信推送的消息体 :" + bodyString);

                try {
                    MessageVo messageVo = JSONObject.parseObject(bodyString, MessageVo.class);
                    JSONObject jsonObject = JSONObject.parseObject(messageVo.getMsg());
                    String receivePhone = jsonObject.getString("receivePhone");
                    String content = jsonObject.getString("content");
                    sendSMSService.sendMsg(receivePhone, content);//调用短信接口，发送短信
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息成功
                } catch (Exception e) {
                    logger.error("exception--->", e);
                    i++;
                    if (i > 10) {
                        logger.info("<------------- queueMessagePush : [" + bodyString + "], 消息出错,失败次数" + i + ",  跳过发送此消息, 进入下一条--------------->");
                        i = 0;
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息成功消费
                        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //重新发送
                    }
                    Thread.sleep(30000 * i);
                    logger.error("消息推送记录出错：" + e.getMessage() + "/r/n重新发送,次数:" + i);
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //重新发送
                }

            }
        });
        return container;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(queueSmsPushFactory());
    }
}
